#include "configure.h"

#include <pthread.h>
#include <list.h>

#include "thread_pool.h"
#include "dbg.h"


static inline int __thread_is_stop(tp_thread_t *self)
{
        return self->status == __TH_STOP__;
}

static inline int __thread_is_pause(tp_thread_t *self)
{
        return self->status == __TH_PAUSE__;
}

static int __thread_create(tp_thread_t **thread, thread_pool_t *pool, thread_pool_fn fn, void *arg, int free_arg)
{
        int ret;
        tp_thread_t *_thread;

        *thread = NULL;

        ret = ymalloc((void **)&_thread, sizeof(tp_thread_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(_thread, 0x0, sizeof(tp_thread_t));

        INIT_LIST_HEAD(&_thread->hook);

        _thread->pool = pool;
        _thread->status = __TH_STOPPED__;

        _thread->fn = fn;
        _thread->arg = arg;
        _thread->free_arg = free_arg;
        _thread->ret = 0;

        _thread->is_stop = __thread_is_stop;
        _thread->is_pause = __thread_is_pause;

        *thread = _thread;
        return 0;
err_ret:
        return ret;
}

static int __thread_destroy(tp_thread_t **thread)
{
        tp_thread_t *_thread = *thread;

        if (_thread) {
                if (_thread->free_arg && _thread->arg) {
                        yfree((void **)&_thread->arg);
                }

                yfree((void **)&_thread);
        }

        return 0;
}

/// -----------------------------------------------------------------
/// THREAD POOL
/// -----------------------------------------------------------------

static void *__thread_proc(void *arg)
{
        int ret;
        void *res;
        tp_thread_t *thread = arg;
        thread_pool_t *pool = thread->pool;

        thread->status = __TH_RUNNING__;

        pool->change_running(pool, 1);

        while (1) {
                if (thread->is_stop(thread)) {
                        break;
                }

                if (thread->is_pause(thread)) {
                        continue;
                }

                res = thread->fn(thread);
                if (unlikely(thread->ret)) {
                        ret = thread->ret;
                        GOTO(err_ret, ret);
                }
        }

        thread->status = __TH_STOPPED__;
        pool->change_running(pool, -1);

        return NULL;
err_ret:
        thread->status = __TH_STOPPED__;
        pool->change_running(pool, -1);

        return res;
}

static int __thread_pool_add(thread_pool_t *pool, thread_pool_fn fn, void *arg, int free_arg)
{
        int ret;
        tp_thread_t *thread;

        ret = __thread_create(&thread, pool, fn, arg, free_arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        count_list_add_tail(&thread->hook, &pool->threads);
        return 0;
err_ret:
        return ret;
}

static int __thread_pool_start(thread_pool_t *pool)
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;
        tp_thread_t *thread;

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        struct list_head *pos, *n;
        list_for_each_safe(pos, n, &pool->threads.list) {
                thread = list_entry(pos, tp_thread_t, hook);

                ret = pthread_create(&th, &ta, __thread_proc, thread);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                thread->th = th;
        }

        return 0;
}

static int __thread_pool_stop(thread_pool_t *pool)
{
        tp_thread_t *thread;
        struct list_head *pos, *n;

        list_for_each_safe(pos, n, &pool->threads.list) {
                thread = list_entry(pos, tp_thread_t, hook);
                thread->status = __TH_STOP__;
        }

        return 0;
}

static int __thread_pool_is_stop(thread_pool_t *pool, int *stop)
{
        int ret;

        *stop = 0;

        ret = pthread_rwlock_rdlock(&pool->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (pool->running == 0) {
                *stop = 1;
        }

        pthread_rwlock_unlock(&pool->lock);
        return 0;
err_ret:
        return ret;
}

static int __thread_pool_ensure_stop(thread_pool_t *pool)
{
        int ret, stop, retry = 0;

        ret = pool->stop(pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        while (TRUE) {
                ret = pool->is_stop(pool, &stop);
                if (unlikely(ret)) {
                        usleep(100 * 1000);
                        continue;
                }

                if (!stop) {
                        retry++;
                        DINFO("retry %d\n", retry);
                        usleep(1000 * 1000);
                }

                break;
        }

        return 0;
err_ret:
        return ret;
}

static inline int __thread_pool_change_running(thread_pool_t *pool, int n)
{
        int ret;

        ret = pthread_rwlock_wrlock(&pool->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        pool->running += n;

        // TODO ylib/lib/sysutil.c:570 _sem_timedwait1 ERROR: !!!!!!!!!!assert fail!!!!!!!!!!!!!!!
        DINFO("running thread %d\n", pool->running);

        pthread_rwlock_unlock(&pool->lock);
        return 0;
err_ret:
        return ret;
}

int thread_pool_join(thread_pool_t *pool)
{
        tp_thread_t *thread;
        struct list_head *pos, *n;

        list_for_each_safe(pos, n, &pool->threads.list) {
                thread = list_entry(pos, tp_thread_t, hook);

                pthread_join(thread->th, NULL);
        }

        return 0;
}

int thread_pool_init(thread_pool_t *pool)
{
        int ret;

        count_list_init(&pool->threads);
        pool->running = 0;

        ret = pthread_rwlock_init(&pool->lock, NULL);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        pool->add = __thread_pool_add;

        pool->start = __thread_pool_start;
        pool->stop = __thread_pool_stop;
        pool->is_stop = __thread_pool_is_stop;
        pool->ensure_stop = __thread_pool_ensure_stop;
        pool->change_running = __thread_pool_change_running;

        YASSERT(pool->threads.count == 0);

        return 0;
}

int thread_pool_destroy(thread_pool_t *pool)
{
        struct list_head *pos, *n;
        tp_thread_t *thread;

        YASSERT(pool->running == 0);

        list_for_each_safe(pos, n, &pool->threads.list) {
                thread = list_entry(pos, tp_thread_t, hook);

                count_list_del_init(pos, &pool->threads);
                __thread_destroy(&thread);
        }

        YASSERT(pool->threads.count == 0);

        pthread_rwlock_destroy(&pool->lock);

        return 0;
}

/// -----------------------------------------------------------------
/// TEST
/// -----------------------------------------------------------------

typedef struct {
        int x;
} __my_arg_t;

static void *__my_thread_proc(void *arg)
{
        tp_thread_t *thread = arg;
        __my_arg_t *my_arg = thread->arg;

        while (TRUE) {
                if (thread->is_stop(thread))
                        break;

                DINFO("x = %d\n", my_arg->x);
                sleep(1);
        }

        return NULL;
}

int thread_pool_test()
{
        int ret;
        thread_pool_t pool;
        __my_arg_t *arg;

        ret = thread_pool_init(&pool);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        for (int i=0; i < 10; i++) {
                ret = ymalloc((void **)&arg, sizeof(__my_arg_t));
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                arg->x = i;

                ret = pool.add(&pool, __my_thread_proc, arg, TRUE);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        pool.start(&pool);

        sleep(5);

        pool.ensure_stop(&pool);

        DINFO("destroy\n");

        thread_pool_destroy(&pool);

        return 0;
err_ret:
        return ret;
}
